/**
 * 
 */
package shellInterface;

import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.lang.reflect.Field;
import java.math.BigInteger;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.file.FileSystems;
import java.nio.file.Path;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.sql.SQLException;
import java.text.ParseException;
import java.time.Year;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Scanner;
import java.util.regex.Pattern;

import org.json.JSONArray;
import org.json.JSONException;

import fetcher.SparkConfiguration;
import graybox.GrayBoxConf;
import huristic.HuristicConf;
import mysql.DB;
import others.Global;
import others.Global.SparkMode;


/**
*@author tonychao 
*功能 用户和Spark Submit之间的任务交互
*用户提交任务，由Daemon进程进行任务处理 然后使用Spark-submit进行处理
*/
public class Temp {
	static int inputFileSize;
	static SparkMode sparkMode;

	public static void main(String[] args) throws FileNotFoundException, JSONException, ParseException {
		/**
		*创建时间：2017年7月16日
		*这是一个由用户调用的程序
		*/
		UIOutPut("当前的操作系统是"+OSinfo.getFullOSName()+"\t版本号:"+System.getProperty("os.version")+"\t架构:"+System.getProperty("os.arch"));
		
		//解析并执行命令
        try {
			parseArgs(args);
		} catch (IOException | InterruptedException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		} catch (SQLException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
        System.exit(0);
        
        
        Scanner scanner = new Scanner(System.in);  
        String Command =null;
		while(true){
			Command = scanner.nextLine();
            if(Command.equals("END"))break;
            System.out.println(Command);
        }
        scanner.close();
	}
	
	/**
	 * file_size_[input_file_size_IN_MB] spark-submit  [options] <app jar | python file> [app arguments]
	*创建时间：2017年7月16日
	*输入
	*输出
	*用于解析用户的命令
	 * @throws JSONException 
	 * @throws ParseException 
	*/
	@SuppressWarnings("unused")
	private static void parseArgs(String[] args) throws IOException, InterruptedException, SQLException, JSONException, ParseException {
		inputFileSize=-1;
		boolean app_flag=false;
		String application_path=null,className=null;
		ArrayList<String> argList= new ArrayList<String>(Arrays.asList(args));
		//for (String term : argList) System.out.println(term);
		
		ArrayList<String> rawSparkOptions= new ArrayList<String>();
		ArrayList<String> appArguments = new ArrayList<String>();
		for (String arg : argList){
			if (arg.equals("spark-submit")) continue;
			else if (arg.matches("^file_size_.+")){
				arg=arg.substring(10, arg.length());
				inputFileSize=Integer.parseInt(arg);
			}
			//因为分割字符串的坑爹方法，所以要过滤 backslash
			else if (arg.equals("\\")) continue;
			//如果是提交任务
			else if (arg.matches(".*\\.jar")||arg.matches(".*\\.py")){
				app_flag=true;
				application_path=arg;
			}
			//任务已提交，说明剩下的是app 的自己的参数，不能够修改，原样输出
			else if(app_flag){
				appArguments.add(arg);
			}
			//任务未提交 说明是spark的配置参数
			else{
				rawSparkOptions.add(arg);
			}
		}
		
		//对options 进行修改，因为option 是一个字符串 所以--class tonychaotest.id.App  \
		ArrayList<String> userDefinedParameters=(ArrayList<String>) rawSparkOptions.clone();

		for (int i=0;i<userDefinedParameters.size();i++){
			String term= userDefinedParameters.get(i);
			if (term.equals("--conf")){	//配置
				userDefinedParameters.remove(i);
				i--;
			}
			else if (term.equals("--class")) { //对于一个jar包打包的Spark应用而言，整体由 jar包+class 决定了实际执行的程序
				className=userDefinedParameters.get(i+1); //如果是spark class 类
				UIOutPut("get class name: " +className);
				userDefinedParameters.remove(i);		//delete --class 以及 class 的内容
				userDefinedParameters.remove(i);
				i--;
			}
			else if (term.equals("--master")){
				String tmp=userDefinedParameters.get(i+1);
				switch (tmp) {
				case "yarn":
					sparkMode=Global.SparkMode.SPARK_ON_YARN;
					break;
				default:
					sparkMode=Global.SparkMode.SPARK_STAND_ALONE;
					break;}
				
				i++;
			}
		}
		//System.out.println(optList);
		//System.out.println(rawOptions);
		
		
		//这说明没有找到提交的任务文件，执行失败，弹出错误信息
		if (!app_flag){
			UIErr("命令不符合规范！");
			UIErr("用法："+" file_size_[input_file_size_IN_MB] spark-submit [options] <app jar | python file> [app arguments]");
			System.exit(1);
		}

//		//如果是其他命令
//		else {
//			UIErr("synatx error ! /n"
//					+ "usage:spark-tune-submit [options] <app jar | python file> [app arguments]");
//			//规范格式提示
//		}
		
		DB db= new DB();
		try {
			db.initConnection();
		} catch (ClassNotFoundException e1) {
			// TODO Auto-generated catch block
			e1.printStackTrace();
		} catch (SQLException e1) {
			// TODO Auto-generated catch block
			e1.printStackTrace();
		}
		
		
		//提交任务
		//提交的任务未注册
		String programMD5 = null;
		try {
			programMD5 = getMd5ByPath(application_path);
			UIOutPut("get application MD5：\t"+programMD5);
		} catch (NoSuchAlgorithmException e) {
			// TODO Auto-generated catch block
			UIErr("无法计算MD5！");
			e.printStackTrace();
		}
		
		if (!Global.USE_DB_Flag){
			UIOutPut("不使用数据库进行比较，单任务测试模式");
			String ret=callSparkSubmit(programMD5, className,rawSparkOptions,application_path,appArguments,"direct"); //direct:仅仅使用启发式优化参数
		}

		else if (!db.isRegisted(programMD5,className)){
			UIOutPut("此应用未注册");
			db.registe(programMD5,className);
			String ret=callSparkSubmit(programMD5,className,rawSparkOptions,application_path,appArguments,"direct");
			String app_id= parseReturnData(ret);
			if(!app_id.equals("-1")) db.addRunRecord(programMD5,className,app_id,Global.WAIT_TIME_IN_MS);
			
		}
		//提交的任务已经建立模型
		else if(db.hasModel(programMD5,className)){
			UIOutPut("此应用已注册，且有模型。可以进行优化"); 
			double accRatio=db.getaccRatio(programMD5,className);
			UIOutPut("*********预计减少运行时间:"+(int)(accRatio*100)+"***********" );
			String ret=callSparkSubmit(programMD5, className,rawSparkOptions,application_path,appArguments,"graybox");//graybox:使用启发式优化+黑盒优化技术
			String app_id= parseReturnData(ret);
			if(!app_id.equals("-1"))  db.addRunRecord(programMD5,className,app_id,Global.WAIT_TIME_IN_MS);
		}
		
		//提交的任务已注册但是未建立模型
		else{
			UIOutPut("此应用已注册，但没有模型");
			String ret=callSparkSubmit(programMD5, className,rawSparkOptions,application_path,appArguments,"direct");;
			String app_id= parseReturnData(ret);
			if(!app_id.equals("-1"))  db.addRunRecord(programMD5,className,app_id,Global.WAIT_TIME_IN_MS);
		}
		db.close();
	}

	
	/**
	 * 从返回字符串中获得app-id linux 
	 * @param ret
	 * @return
	 * 
	 */
	public static String parseReturnData(String ret) {
		/**
		*创建时间：2017年7月29日
		*输入
		*输出
		*/
		//StandaloneSchedulerBackend: Connected to Spark cluster with app ID app-20170729084629-0000
		String mesosMode="Framework registered with ";
		String standAloneMode="app ID ";
		String yarnMode="Submitting application "; //Submitting application application_1532181494552_0007 to ResourceManager
		if (ret==null) return "-1";
		int index=ret.indexOf(standAloneMode);
		if(index !=-1){
			index+=standAloneMode.length();
			ret=ret.substring(index);
			String[] list= ret.split("[ \t\n]+");
			ret=list[0];
			if (ret.equals("")) ret=list[1];
			return ret;
		}
		
		index=ret.indexOf(mesosMode);
		if (index !=-1){
			index=ret.indexOf(mesosMode)+mesosMode.length();
			ret=ret.substring(index);
			String[] list= ret.split("[ \t\n]+");
			ret=list[0];
			if (ret.equals("")) ret=list[1];
			return ret;
		}
		
		index=ret.indexOf(yarnMode);
		if (index !=-1){
			index=ret.indexOf(yarnMode)+yarnMode.length();
			ret=ret.substring(index);
			String[] list= ret.split("[ \t\n]+");
			ret=list[0];
			if (ret.equals("")) ret=list[1];
			return ret;
		}
		UIErr("没找到appid");
		return null;

	}

	/**
	 * @param options 
	 * @param term
	 * @param appArguments
	 * @param string
	 * spark-tune-submit [options] <app jar | python file> [app arguments]
	 * @throws InterruptedException 
	 * @throws IOException 
	 * 调用优化，并且将任务交个背后的shell 进行执行
	 * @throws JSONException 
	 */
	private static String callSparkSubmit(String app_md5,String className,ArrayList<String> options, String program, ArrayList<String> appArguments,
			String op) throws IOException, InterruptedException, JSONException {
		
		String command= "";
		
		
		ArrayList<String> optionsTuned=null;
				
				//优化执行，修改参数
				UIOutPut("[q]是否启用优化程序？请输入Y/N");
				Scanner s = new Scanner(System.in);
				String tmp = null;
				
				
	outerLable:
		while(s.hasNextLine()){
			tmp  = s.nextLine();
			switch (tmp) {
			case "Y":
			case "y":
			case "Yes":
			case "YES":
			case "yes":
				UIOutPut("启动优化模式");
				//使用优化参数
				optionsTuned=getTunedOptions(app_md5,className,op);
				if (className!=null) {
					optionsTuned.add(0, "--class "+className+" ");
				}
				if (sparkMode!=null){
					optionsTuned.add(1, "--master "+sparkMode+" ");
				};
				
				UIOutPut("选择123");
				int tmp2= Integer.parseInt(s.nextLine());
				switch (tmp2) {
				case 1:
					options=optionsTuned; //优化模式options=optionsTuned; //优化模式
					break;
				case 2:
					options=options;//默认模式
					break;
				case 3:
					String confJson= s.nextLine();
					JSONArray jsonArray= new JSONArray(confJson);
					options= new ArrayList<String>();
					if (className!=null) {
						options.add(0, "--class "+className+" ");
					}
					if (sparkMode!=null){
						options.add(1, "--master "+sparkMode+" ");
					};
					for (int i=0;i<jsonArray.length();i++){
						options.add("--conf "+jsonArray.getJSONObject(i).getString("name")+"="+jsonArray.getJSONObject(i).getString("value")+"\t");
					}
					break;
				default:
					break;
				}
				break outerLable;
			case "N":
			case "n":
			case "No":
			case "NO":
			case "no":
				UIOutPut("不启动优化模式");
				//啥都不干
				break outerLable;
				//重新询问
			default:
				UIOutPut("请输入Y/N");
				break;
			}
		}
		

				

			

		for (String str :options){
			command=command+str+" ";
		}
		//将命令拼好，然后提交给shell 执行
		command=command+program+" ";
		for (String str:appArguments){
			command=command+str+" ";
		}
		
		ArrayList<String> commandArray =new ArrayList<String>();
		commandArray.add("spark-submit "+command);
		ArrayList<String> retStr=LinuxShell.runInSequence(commandArray);
		if (retStr==null) {
			UIErr("没有拿到应用返回字符串");
			return null;
		}
		return retStr.get(0);
	}
	
	
	/**
	 *从调优模块哪里拿到优化之后的数据
	 * @param options
	 * 用户输入的参数实际上并没有得到应用
	 * @return
	 * @throws InterruptedException 
	 * @throws JSONException 
	 * @throws IOException 
	 */
	private static ArrayList<String> getTunedOptions(String app_md5,String className,String op) throws InterruptedException, IOException, JSONException {

		
		//启发式部分目前包括 
		//spark.executor.cores
		//spark.executor.memory
		//spark.task.cpus
		
		//spark.default.parallelism?
		
		//spark.driver.cores
		//spark.driver.memory
		//spark.default.parallelism
	
		HuristicConf conf= new HuristicConf(inputFileSize,sparkMode); //参数暂时没有用
		ArrayList<String> tmp= conf.toConfList();
		
		
		//灰盒模型暂时包括 --使用灰盒模型对应用进行建模--分conf区
		
		//shuffle read/shuffle write
		
		//网络带宽
		
	
		
		//是否要解压缩-计算平衡
		
		
		//梯度下降方法暂时包括 --使用梯度下降法进行训练和搜索部分分组的参数
		
		//
		
		
		GrayBoxConf grayboxConf=null;
		SparkConfiguration configuration=null;
		switch (op){
			case "direct":
				//仅仅使用启发式优化
				break;
			case "graybox":
				//使用黑盒优化模型

				grayboxConf =new GrayBoxConf(app_md5, className);
				configuration=grayboxConf.searchRRS();
				//将configuration 中使用启发式优化的部分进行赋值，仅仅是为了演示的时候统一返回json文件
				configuration.setHuristicConfs(conf);
				UIOutPut("[参数——JSON]");
				UIOutPut(configuration.toJsonString());
				
				ArrayList<String> res=configuration.toConfList(); //这一函数将group1+group2+group3 的 所有参数按照Linux-shell Spark 命令行的规范输出
				tmp.addAll(res);
				break;
			case "gradientDescent":
				grayboxConf =new GrayBoxConf(app_md5, className);
				configuration=grayboxConf.searchGradientDescent();
				ArrayList<String> res2=configuration.toConfList();
				tmp.addAll(res2);
				break;
			default:
				UIErr("出现问题");
		}
		
		

		
		
		return tmp;

		


	}


	/**
	 * @param path
	 * 注册一个新的应用，在application中新建一个aplication项
	 * 向数据库注册一个新的应用
	 * UNDONE
	 */
	private static void registe(String pathStr) {
		File localFile;
		
			//获取应用的hdfs路径
		
			//获取应用程序
		
			//获取应用数据量
		
			//提取应用功能sha1值
		
			//先提取应用的MD5值
			/**这里存在着这样的逻辑问题，如果同一个jar包，需要识别不同的class才能说明是相同的一个应用，否则的话同一个jar包种不同的可执行类是不同的程序
			*--class
			*而对于python文件而言，一个问题是Python文件之间存在的引用关系可能使得仅仅检测一个文件是不够的，其引用的文件如果发生变化也会出问题
			*/
		
	}


	/**
	 * Spark可以使用本地文件提交程序
	 * @return File 非空 代表本地磁盘路径下存在此文件，null代表不存在此文件
	 */
	private static File getFileOnLocalFileSystem(String pathStr){
		Path path = FileSystems.getDefault().getPath(pathStr);
		File aFile= new File(path.toString());
		return aFile;
	}


	//向用户输出
	public static void UIOutPut(String line) {
		System.out.println("[调优模块]  "+line);

		System.out.flush();// 必须通过flush将字节流清空
	}
	
	public void UIWarn(String line){
		System.err.println("NOT DEFINED");
		System.err.flush();// 必须通过flush将字节流清空
	}
	
	public static void UIErr(String line){
		System.err.println("[调优模块 错误]  "+line);
		System.err.flush();// 必须通过flush将字节流清空
	}
	
	//从用户接受输入
	//所用命令只需要一行
	public static String UIInPut(){
		return null;
     
	}
	
	
	
	/**
	 * 在文件系统下计算MD5值
	 * @param file
	 * @return
	 * @throws FileNotFoundException
	 */
	 public static String getMd5ByFile(File file) throws FileNotFoundException {  

         String value = null;  
         FileInputStream in = new FileInputStream(file);  
     try {  
         MappedByteBuffer byteBuffer = in.getChannel().map(FileChannel.MapMode.READ_ONLY, 0, file.length());  
         MessageDigest md5 = MessageDigest.getInstance("MD5");
         
//         System.err.println("FILE");
//         System.out.println(byteBuffer);
//         for ( int i =0;i<byteBuffer.limit();i++){
//        	 System.out.print(byteBuffer.get(i));
//         }
//         
        // System.out.println("");
         
         md5.update(byteBuffer); 
         BigInteger bi = new BigInteger(1, md5.digest());  
         value = bi.toString(16);  
     } catch (Exception e) {  
         e.printStackTrace();  
     } finally {  
             if(null != in) {  
                 try {  
                 in.close();  
             } catch (IOException e) {  
                 e.printStackTrace();  
             }  
         }  
     }  
     return value;  
     }  
 
	/**
	 * 
	 * @param pathStr
	 * @return
	 * @throws IOException 
	 * @throws NoSuchAlgorithmException 
	 * Null 如果没有根据文件路径找到文件 ，否则返回文件的MD5值
	 */
	 public static String getMd5ByPath(String pathStr) throws IOException, NoSuchAlgorithmException{
		File localFile;
		//如果文件在本地硬盘上，并且能打开
		if ((localFile=getFileOnLocalFileSystem(pathStr))!=null&&(localFile.exists())){
			return getMd5ByFile(localFile);
		}
		//如果文件在HDFS上能够找到，暂时屏蔽
		/*
		else if(HDFSFetcher_test.isFileOnHDFS(pathStr)){
			return HDFSFetcher_test.getHDFSMD5(pathStr);
		}
		*/
		//如果文件在HDFS和本地硬盘上都打不开，说明有问题
		else {
			UIErr("无法找到文件\" "+pathStr+"\"\t\""+pathStr+"\"既不是HDFS上的文件，也不是本地文件");
			return null;
		}
	}
	 
	
	 /**
	  * 
	  * @param str 需要校验的字符串
	  * @return MD5
	  * @throws NoSuchAlgorithmException
	  */
	 public static String getMD5ByString(String str) throws NoSuchAlgorithmException{
		 str="spark-submit file:///home/tonychao/spark-2.0.2-bin-hadoop2.6/README.md";
	    MessageDigest md5 = MessageDigest.getInstance("MD5");  
	    byte[] a= str.getBytes();

	    md5.update(a); 
	    BigInteger bi = new BigInteger(1, md5.digest());  
	    String value = bi.toString(16);
	    return value; 
	}
}
